<!DOCTYPE html>












  


<html class="theme-next pisces use-motion" lang="zh-CN">
<head><meta name="generator" content="Hexo 3.8.0">
  <meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=2">
<meta name="theme-color" content="#222">



















  
  
  
  

  
    
    
  

  
    
      
    

    
  

  

  
    
      
    

    
  

  
    
      
    

    
  

  
    
    
    <link rel="stylesheet" href="//fonts.googleapis.com/css?family=Monda:300,300italic,400,400italic,700,700italic|Roboto Slab:300,300italic,400,400italic,700,700italic|Lobster Two:300,300italic,400,400italic,700,700italic|PT Mono:300,300italic,400,400italic,700,700italic&subset=latin,latin-ext">
  






<link rel="stylesheet" href="/lib/font-awesome/css/font-awesome.min.css?v=4.7.0">

<link rel="stylesheet" href="/css/main.css?v=7.1.2">


  <link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png?v=7.1.2">


  <link rel="icon" type="image/png" sizes="32x32" href="/images/favicon-32x32-next.png?v=7.1.2">


  <link rel="icon" type="image/png" sizes="16x16" href="/favicon.ico?v=7.1.2">


  <link rel="mask-icon" href="/images/logo.svg?v=7.1.2" color="#222">







<script id="hexo.configurations">
  var NexT = window.NexT || {};
  var CONFIG = {
    root: '/',
    scheme: 'Pisces',
    version: '7.1.2',
    sidebar: {"position":"left","display":"hide","offset":12,"onmobile":false,"dimmer":false},
    back2top: true,
    back2top_sidebar: false,
    fancybox: false,
    fastclick: false,
    lazyload: false,
    tabs: true,
    motion: {"enable":true,"async":true,"transition":{"post_block":"fadeIn","post_header":"slideDownIn","post_body":"slideDownIn","coll_header":"slideLeftIn","sidebar":"slideUpIn"}},
    algolia: {
      applicationID: '',
      apiKey: '',
      indexName: '',
      hits: {"per_page":10},
      labels: {"input_placeholder":"Search for Posts","hits_empty":"We didn't find any results for the search: ${query}","hits_stats":"${hits} results found in ${time} ms"}
    }
  };
</script>


  




  <meta name="description" content="前面已经为大家介绍了 Flink 的基本概念以及安装部署的过程，从而希望能够帮助读者建立起对 Flink 的初步印象。本次课程开始，我们将进入第二部分，即 Flink 实际开发的相关内容。本次课程将首先介绍 Flink 开发中比较核心的 DataStream API 。我们首先将回顾分布式流处理的一些基本概念，这些概念对于理解实际的 DataStream API 有非常大的作用。然后，我们将详细介">
<meta name="keywords" content="Flink">
<meta property="og:type" content="article">
<meta property="og:title" content="Apache Flink 零基础入门（三）：DataStream API 编程">
<meta property="og:url" content="https://www.dudefu.tk/Apache Flink 零基础入门（三）：DataStream API 编程.html">
<meta property="og:site_name" content="The Future">
<meta property="og:description" content="前面已经为大家介绍了 Flink 的基本概念以及安装部署的过程，从而希望能够帮助读者建立起对 Flink 的初步印象。本次课程开始，我们将进入第二部分，即 Flink 实际开发的相关内容。本次课程将首先介绍 Flink 开发中比较核心的 DataStream API 。我们首先将回顾分布式流处理的一些基本概念，这些概念对于理解实际的 DataStream API 有非常大的作用。然后，我们将详细介">
<meta property="og:locale" content="zh-CN">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtx3vaoymj30k00b942m.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtx3zwz95j30k00b9abg.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtxav5a66j30k00b9aap.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtxb1mr5oj30k00b9dgv.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtxb8614sj30k00b9mxp.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtxa8iaumj30k00b9wft.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtxaddsw4j30k00b9q4d.jpg">
<meta property="og:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtxajahvbj30k00b93zm.jpg">
<meta property="og:updated_time" content="2021-01-20T01:54:03.764Z">
<meta name="twitter:card" content="summary">
<meta name="twitter:title" content="Apache Flink 零基础入门（三）：DataStream API 编程">
<meta name="twitter:description" content="前面已经为大家介绍了 Flink 的基本概念以及安装部署的过程，从而希望能够帮助读者建立起对 Flink 的初步印象。本次课程开始，我们将进入第二部分，即 Flink 实际开发的相关内容。本次课程将首先介绍 Flink 开发中比较核心的 DataStream API 。我们首先将回顾分布式流处理的一些基本概念，这些概念对于理解实际的 DataStream API 有非常大的作用。然后，我们将详细介">
<meta name="twitter:image" content="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtx3vaoymj30k00b942m.jpg">





  
  
  <link rel="canonical" href="https://www.dudefu.tk/Apache Flink 零基础入门（三）：DataStream API 编程">



<script id="page.configurations">
  CONFIG.page = {
    sidebar: "",
  };
</script>

  <title>Apache Flink 零基础入门（三）：DataStream API 编程 | The Future</title>
  












  <noscript>
  <style>
  .use-motion .motion-element,
  .use-motion .brand,
  .use-motion .menu-item,
  .sidebar-inner,
  .use-motion .post-block,
  .use-motion .pagination,
  .use-motion .comments,
  .use-motion .post-header,
  .use-motion .post-body,
  .use-motion .collection-title { opacity: initial; }

  .use-motion .logo,
  .use-motion .site-title,
  .use-motion .site-subtitle {
    opacity: initial;
    top: initial;
  }

  .use-motion .logo-line-before i { left: initial; }
  .use-motion .logo-line-after i { right: initial; }
  </style>
</noscript>

</head>

<body itemscope="" itemtype="http://schema.org/WebPage" lang="zh-CN">

  
  
    
  

  <div class="container sidebar-position-left page-post-detail">
    <div class="headband"></div>

    <header id="header" class="header" itemscope="" itemtype="http://schema.org/WPHeader">
      <div class="header-inner"><div class="site-brand-wrapper">
  <div class="site-meta">
    

    <div class="custom-logo-site-title">
      <a href="/" class="brand" rel="start">
        <span class="logo-line-before"><i></i></span>
        <span class="site-title">The Future</span>
        <span class="logo-line-after"><i></i></span>
      </a>
    </div>
    
      
        <h1 class="site-subtitle" itemprop="description">Stay hungry,stay foolish.</h1>
      
    
    
  </div>

  <div class="site-nav-toggle">
    <button aria-label="切换导航栏">
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
    </button>
  </div>
</div>



<nav class="site-nav">
  
    <ul id="menu" class="menu">
      
        
        
        
          
          <li class="menu-item menu-item-home">

    
    
    
      
    

    
      
    

    <a href="/" rel="section"><i class="menu-item-icon fa fa-fw fa-home"></i> <br>首页</a>

  </li>
        
        
        
          
          <li class="menu-item menu-item-archives">

    
    
    
      
    

    
      
    

    <a href="/archives/" rel="section"><i class="menu-item-icon fa fa-fw fa-archive"></i> <br>归档<span class="badge">125</span></a>

  </li>
        
        
        
          
          <li class="menu-item menu-item-categories">

    
    
    
      
    

    
      
    

    <a href="/categories" rel="section"><i class="menu-item-icon fa fa-fw fa-th"></i> <br>分类<span class="badge">15</span></a>

  </li>
        
        
        
          
          <li class="menu-item menu-item-tags">

    
    
    
      
    

    
      
    

    <a href="/tags" rel="section"><i class="menu-item-icon fa fa-fw fa-tags"></i> <br>标签<span class="badge">63</span></a>

  </li>
        
        
        
          
          <li class="menu-item menu-item-something">

    
    
    
      
    

    
      
    

    <a href="/something" rel="section"><i class="menu-item-icon fa fa-fw fa-paper-plane"></i> <br>干货</a>

  </li>
        
        
        
          
          <li class="menu-item menu-item-about">

    
    
    
      
    

    
      
    

    <a href="/about/" rel="section"><i class="menu-item-icon fa fa-fw fa-user"></i> <br>关于</a>

  </li>

      
      
        <li class="menu-item menu-item-search">
          
            <a href="javascript:;" class="popup-trigger">
          
            
              <i class="menu-item-icon fa fa-search fa-fw"></i> <br>搜索</a>
        </li>
      
    </ul>
  

  

  
    <div class="site-search">
      
  <div class="popup search-popup local-search-popup">
  <div class="local-search-header clearfix">
    <span class="search-icon">
      <i class="fa fa-search"></i>
    </span>
    <span class="popup-btn-close">
      <i class="fa fa-times-circle"></i>
    </span>
    <div class="local-search-input-wrapper">
      <input autocomplete="off" placeholder="搜索..." spellcheck="false" type="text" id="local-search-input">
    </div>
  </div>
  <div id="local-search-result"></div>
</div>



    </div>
  
</nav>



  



</div>
    </header>

    


    <main id="main" class="main">
      <div class="main-inner">
        <div class="content-wrap">
          
            

          
          <div id="content" class="content">
            

  <div id="posts" class="posts-expand">
    

  

  
  
  

  
    <div class="reading-progress-bar"></div>
  

  <article class="post post-type-normal" itemscope="" itemtype="http://schema.org/Article">
  
  
  
  <div class="post-block">
    <link itemprop="mainEntityOfPage" href="https://www.dudefu.tk/Apache Flink 零基础入门（三）：DataStream API 编程.html">

    <span hidden itemprop="author" itemscope="" itemtype="http://schema.org/Person">
      <meta itemprop="name" content="Daniel X">
      <meta itemprop="description" content="專注于大数据技術，分享干货">
      <meta itemprop="image" content="https://hexoblog-1254111960.cos.ap-guangzhou.myqcloud.com/HexoBlog-tou.jpg">
    </span>

    <span hidden itemprop="publisher" itemscope="" itemtype="http://schema.org/Organization">
      <meta itemprop="name" content="The Future">
    </span>

    
      <header class="post-header">

        
        
          <h2 class="post-title" itemprop="name headline">Apache Flink 零基础入门（三）：DataStream API 编程

              
            
          </h2>
        

        <div class="post-meta">
          <span class="post-time">

            
            
            

            
              <span class="post-meta-item-icon">
                <i class="fa fa-calendar-o"></i>
              </span>
              
                <span class="post-meta-item-text">发表于</span>
              

              
                
              

              <time title="创建时间：2021-01-19 14:13:34" itemprop="dateCreated datePublished" datetime="2021-01-19T14:13:34+08:00">2021-01-19</time>
            

            
              

              
                
                <span class="post-meta-divider">|</span>
                

                <span class="post-meta-item-icon">
                  <i class="fa fa-calendar-check-o"></i>
                </span>
                
                  <span class="post-meta-item-text">更新于</span>
                
                <time title="修改时间：2021-01-20 09:54:03" itemprop="dateModified" datetime="2021-01-20T09:54:03+08:00">2021-01-20</time>
              
            
          </span>

          
            <span class="post-category">
            
              <span class="post-meta-divider">|</span>
            
              <span class="post-meta-item-icon">
                <i class="fa fa-folder-o"></i>
              </span>
              
                <span class="post-meta-item-text">分类于</span>
              
              
                <span itemprop="about" itemscope="" itemtype="http://schema.org/Thing"><a href="/categories/大数据/" itemprop="url" rel="index"><span itemprop="name">大数据</span></a></span>

                
                
              
            </span>
          

          
            
            
              
              <span class="post-comments-count">
                <span class="post-meta-divider">|</span>
                <span class="post-meta-item-icon">
                  <i class="fa fa-comment-o"></i>
                </span>
            
                <span class="post-meta-item-text">评论数：</span>
                <a href="/Apache Flink 零基础入门（三）：DataStream API 编程.html#comments" itemprop="discussionUrl">
                  <span class="post-comments-count valine-comment-count" data-xid="/Apache Flink 零基础入门（三）：DataStream API 编程.html" itemprop="commentCount"></span>
                </a>
              </span>
            
          

          
          
            <span id="/Apache Flink 零基础入门（三）：DataStream API 编程.html" class="leancloud_visitors" data-flag-title="Apache Flink 零基础入门（三）：DataStream API 编程">
              <span class="post-meta-divider">|</span>
              <span class="post-meta-item-icon">
                <i class="fa fa-eye"></i>
              </span>
              
                <span class="post-meta-item-text">阅读次数：</span>
              
                <span class="leancloud-visitors-count"></span>
            </span>
          

          

          

          

        </div>
      </header>
    

    
    
    
    <div class="post-body" itemprop="articleBody">

      
      

      
        <p>前面已经为大家介绍了<span class="exturl" data-url="aHR0cHM6Ly95cS5hbGl5dW4uY29tL2FydGljbGVzLzcwODc2OD9zcG09YTJjNGUuMTExNTU0MzUuMC4wLjI2YWYzMzEyVkNheTJk" title="https://yq.aliyun.com/articles/708768?spm=a2c4e.11155435.0.0.26af3312VCay2d"> Flink 的基本概念以及安装部署的过程<i class="fa fa-external-link"></i></span>，从而希望能够帮助读者建立起对 Flink 的初步印象。本次课程开始，我们将进入第二部分，即 Flink 实际开发的相关内容。本次课程将首先介绍 Flink 开发中比较核心的 DataStream API 。我们首先将回顾分布式流处理的一些基本概念，这些概念对于理解实际的 DataStream API 有非常大的作用。然后，我们将详细介绍 DataStream API 的设计，最后我们将通过一个例子来演示 DataStream API 的使用。</p>
<a id="more"></a>
<h3 id="1-流处理基本概念"><a href="#1-流处理基本概念" class="headerlink" title="1. 流处理基本概念"></a>1. 流处理基本概念</h3><p>对于什么是流处理，从不同的角度有不同的定义。其实流处理与批处理这两个概念是对立统一的，它们的关系有点类似于对于 Java 中的 ArrayList 中的元素，是直接看作一个有限数据集并用下标去访问，还是用迭代器去访问。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtx3vaoymj30k00b942m.jpg" alt="_04"></p>
<blockquote>
<p>图1. 左图硬币分类器。硬币分类器也可以看作一个流处理系统，用于硬币分类的各部分组件提前串联在一起，硬币不断进入系统，并最终被输出到不同的队列中供后续使用。右图同理。</p>
</blockquote>
<p>流处理系统本身有很多自己的特点。一般来说，由于需要支持无限数据集的处理，流处理系统一般采用一种数据驱动的处理方式。它会提前设置一些算子，然后等到数据到达后对数据进行处理。为了表达复杂的计算逻辑，包括 Flink 在内的分布式流处理引擎一般采用 DAG 图来表示整个计算逻辑，其中 DAG 图中的每一个点就代表一个基本的逻辑单元，也就是前面说的算子。由于计算逻辑被组织成有向图，数据会按照边的方向，从一些特殊的 Source 节点流入系统，然后通过网络传输、本地传输等不同的数据传输方式在算子之间进行发送和处理，最后会通过另外一些特殊的 Sink 节点将计算结果发送到某个外部系统或数据库中。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtx3zwz95j30k00b9abg.jpg" alt="_05"></p>
<blockquote>
<p>图2. 一个 DAG 计算逻辑图与实际的物理时模型。逻辑图中的每个算子在物理图中可能有多个并发。</p>
</blockquote>
<p>对于实际的分布式流处理引擎，它们的实际运行时物理模型要更复杂一些，这是由于每个算子都可能有多个实例。如图 2 所示，作为 Source 的 A 算子有两个实例，中间算子 C 也有两个实例。在逻辑模型中，A 和 B 是 C 的上游节点，而在对应的物理逻辑中，C 的所有实例和 A、B 的所有实例之间可能都存在数据交换。在物理模型中，我们会根据计算逻辑，采用系统自动优化或人为指定的方式将计算工作分布到不同的实例中。只有当算子实例分布到不同进程上时，才需要通过网络进行数据传输，而同一进程中的多个实例之间的数据传输通常是不需要通过网络的。</p>
<blockquote>
<p>表1. Apache Storm 构造 DAG 计算图。Apache Storm 的接口定义更加“面向操作”，因此更加底层。</p>
</blockquote>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br></pre></td><td class="code"><pre><span class="line">TopologyBuilder builder = new TopologyBuilder();</span><br><span class="line"></span><br><span class="line">builder.setSpout(&quot;spout&quot;, new RandomSentenceSpout(), 5);</span><br><span class="line">builder.setBolt(&quot;split&quot;, new SplitSentence(), 8).shuffleGrouping(&quot;spout&quot;);</span><br><span class="line">builder.setBolt(&quot;count&quot;, new WordCount(), 12).fieldsGrouping(&quot;split&quot;, new Fields(&quot;word&quot;));</span><br></pre></td></tr></table></figure>
<blockquote>
<p>表2. Apache Flink 构造 DAG 计算图。Apache Flink 的接口定义更加“面向数据”，因此更加高层。</p>
</blockquote>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br></pre></td><td class="code"><pre><span class="line">StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();</span><br><span class="line"></span><br><span class="line">DataStream&lt;String&gt; text = env.readTextFile (&quot;input&quot;);</span><br><span class="line">DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);</span><br><span class="line">counts.writeAsText(&quot;output&quot;);</span><br></pre></td></tr></table></figure>
<p>由于流处理的计算逻辑是通过 DAG 图来表示的，因此它们的大部分 API 都是围绕构建这种计算逻辑图来设计的。例如，对于几年前非常流行的 Apache Storm，它的 Word Count 的示例如表 1 所示。基于 Apache Storm 用户需要在图中添加 Spout 或 Bolt 这种算子，并指定算子之前的连接方式。这样，在完成整个图的构建之后，就可以将图提交到远程或本地集群运行。</p>
<p>与之对比，Apache Flink 的接口虽然也是在构建计算逻辑图，但是 Flink 的 API 定义更加面向数据本身的处理逻辑，它把数据流抽象成为一个无限集，然后定义了一组集合上的操作，然后在底层自动构建相应的 DAG 图。可以看出，<strong>Flink 的 API 要更“上层”一些。许多研究者在进行实验时，可能会更喜欢自由度高的 Storm，因为它更容易保证实现预想的图结构；而在工业界则更喜欢 Flink 这类高级 API，因为它使用更加简单。</strong></p>
<h3 id="2-Flink-DataStream-API-概览"><a href="#2-Flink-DataStream-API-概览" class="headerlink" title="2. Flink DataStream API 概览"></a>2. Flink DataStream API 概览</h3><p>基于前面对流处理的基本概念，本节将详细介绍 Flink DataStream API 的使用方式。我们首先还是从一个简单的例子开始看起。表3是一个流式 Word Count 的示例，虽然它只有 5 行代码，但是它给出了基于 Flink DataStream API 开发程序的基本结构。</p>
<blockquote>
<p>表3. 基于 Flink DataStream API 的 Word Count 示例.</p>
</blockquote>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br></pre></td><td class="code"><pre><span class="line">//1、设置运行环境</span><br><span class="line">StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();</span><br><span class="line">//2、配置数据源读取数据</span><br><span class="line">DataStream&lt;String&gt; text = env.readTextFile (&quot;input&quot;);</span><br><span class="line">//3、进行一系列转换</span><br><span class="line">DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);</span><br><span class="line">//4、配置数据汇写出数据</span><br><span class="line">counts.writeAsText(&quot;output&quot;);</span><br><span class="line">//5、提交执行</span><br><span class="line">env.execute(&quot;Streaming WordCount&quot;);</span><br></pre></td></tr></table></figure>
<p>为了实现流式 Word Count，我们首先要先获得一个 StreamExecutionEnvironment 对象。它是我们构建图过程中的上下文对象。基于这个对象，我们可以添加一些算子。对于流处理程度，我们一般需要首先创建一个数据源去接入数据。在这个例子中，我们使用了 Environment 对象中内置的读取文件的数据源。这一步之后，我们拿到的是一个 DataStream 对象，它可以看作一个无限的数据集，可以在该集合上进行一序列的操作。例如，在 Word Count 例子中，我们首先将每一条记录（即文件中的一行）分隔为单词，这是通过 FlatMap 操作来实现的。调用 FlatMap 将会在底层的 DAG 图中添加一个 FlatMap 算子。然后，我们得到了一个记录是单词的流。我们将流中的单词进行分组（keyBy），然后累积计算每一个单词的数据（sum(1)）。计算出的单词的数据组成了一个新的流，我们将它写入到输出文件中。</p>
<p>最后，我们需要调用 env#execute 方法来开始程序的执行。需要强调的是，前面我们调用的所有方法，都不是在实际处理数据，而是在构通表达计算逻辑的 DAG 图。只有当我们将整个图构建完成并显式的调用 Execute 方法后，框架才会把计算图提供到集群中，接入数据并执行实际的逻辑。</p>
<p>基于流式 Word Count 的例子可以看出，基于 Flink 的 DataStream API 来编写流处理程序一般需要三步：通过 Source 接入数据、进行一系统列的处理以及将数据写出。最后，不要忘记显式调用 Execute 方式，否则前面编写的逻辑并不会真正执行。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtxav5a66j30k00b9aap.jpg" alt="_09"></p>
<blockquote>
<p>图3. Flink DataStream 操作概览</p>
</blockquote>
<p>从上面的例子中还可以看出，Flink DataStream API 的核心，就是代表流数据的 DataStream 对象。整个计算逻辑图的构建就是围绕调用 DataStream 对象上的不同操作产生新的 DataStream 对象展开的。整体来说，DataStream 上的操作可以分为四类。第一类是对于单条记录的操作，比如筛除掉不符合要求的记录（Filter 操作），或者将每条记录都做一个转换（Map 操作）。第二类是对多条记录的操作。比如说统计一个小时内的订单总成交量，就需要将一个小时内的所有订单记录的成交量加到一起。为了支持这种类型的操作，就得通过 Window 将需要的记录关联到一起进行处理。第三类是对多个流进行操作并转换为单个流。例如，多个流可以通过 Union、Join 或 Connect 等操作合到一起。这些操作合并的逻辑不同，但是它们最终都会产生了一个新的统一的流，从而可以进行一些跨流的操作。最后， DataStream 还支持与合并对称的操作，即把一个流按一定规则拆分为多个流（Split 操作），每个流是之前流的一个子集，这样我们就可以对不同的流作不同的处理。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtxb1mr5oj30k00b9dgv.jpg" alt="_10"></p>
<blockquote>
<p>图4. 不同类型的 DataStream 子类型。不同的子类型支持不同的操作集合。</p>
</blockquote>
<p>为了支持这些不同的流操作，Flink 引入了一组不同的流类型，用来表示某些操作的中间流数据集类型。完整的类型转换关系如图4所示。首先，对于一些针对单条记录的操作，如 Map 等，操作的结果仍然是是基本的 DataStream 类型。然后，对于 Split 操作，它会首先产生一个 SplitStream，基于 SplitStream 可以使用 Select 方法来筛选出符合要求的记录并再将得到一个基本的流。</p>
<p>类似的，对于 Connect 操作，在调用 streamA.connect(streamB)后可以得到一个专门的 ConnectedStream。ConnectedStream 支持的操作与普通的 DataStream 有所区别，由于它代表两个不同的流混合的结果，因此它允许用户对两个流中的记录分别指定不同的处理逻辑，然后它们的处理结果形成一个新的 DataStream 流。由于不同记录的处理是在同一个算子中进行的，因此它们在处理时可以方便的共享一些状态信息。上层的一些 Join 操作，在底层也是需要依赖于 Connect 操作来实现的。</p>
<p>另外，如前所述，我们可以通过 Window 操作对流可以按时间或者个数进行一些切分，从而将流切分成一个个较小的分组。具体的切分逻辑可以由用户进行选择。当一个分组中所有记录都到达后，用户可以拿到该分组中的所有记录，从而可以进行一些遍历或者累加操作。这样，对每个分组的处理都可以得到一组输出数据，这些输出数据形成了一个新的基本流。</p>
<p>对于普通的 DataStream，我们必须使用 allWindow 操作，它代表对整个流进行统一的 Window 处理，因此是不能使用多个算子实例进行同时计算的。针对这一问题，就需要我们首先使用 KeyBy 方法对记录按 Key 进行分组，然后才可以并行的对不同 Key 对应的记录进行单独的 Window 操作。KeyBy 操作是我们日常编程中最重要的操作之一，下面我们会更详细的介绍。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtxb8614sj30k00b9mxp.jpg" alt="_11"></p>
<blockquote>
<p>图5. 基本流上的 Window 操作与 KeyedStream 上的 Window 操对比。KeyedStream 上的 Window 操作使采用多个实例并发处理成为了可能。</p>
</blockquote>
<p>基本 DataStream 对象上的 allWindow 与 KeyedStream 上的 Window 操作的对比如图5所示。为了能够在多个并发实例上并行的对数据进行处理，我们需要通过 KeyBy 将数据进行分组。KeyBy 和 Window 操作都是对数据进行分组，但是 KeyBy 是在水平分向对流进行切分，而 Window 是在垂直方式对流进行切分。</p>
<p>使用 KeyBy 进行数据切分之后，后续算子的每一个实例可以只处理特定 Key 集合对应的数据。除了处理本身外，Flink 中允许算子维护一部分状态（State），在KeyedStream 算子的状态也是可以分布式存储的。由于 KeyBy 是一种确定的数据分配方式（下文将介绍其它分配方式），因此即使发生 Failover 作业重启，甚至发生了并发度的改变，Flink 都可以重新分配 Key 分组并保证处理某个 Key 的分组一定包含该 Key 的状态，从而保证一致性。</p>
<p>最后需要强调的是，KeyBy 操作只有当 Key 的数量超过算子的并发实例数才可以较好的工作。由于同一个 Key 对应的所有数据都会发送到同一个实例上，因此如果Key 的数量比实例数量少时，就会导致部分实例收不到数据，从而导致计算能力不能充分发挥。</p>
<h3 id="3-其它问题"><a href="#3-其它问题" class="headerlink" title="3. 其它问题"></a>3. 其它问题</h3><p>除 KeyBy 之外，Flink 在算子之前交换数据时还支持其它的物理分组方式。如图 1 所示，Flink DataStream 中物理分组方式包括：</p>
<ul>
<li>Global: 上游算子将所有记录发送给下游算子的第一个实例。</li>
<li>Broadcast: 上游算子将每一条记录发送给下游算子的所有实例。</li>
<li>Forward：只适用于上游算子实例数与下游算子相同时，每个上游算子实例将记录发送给下游算子对应的实例。</li>
<li>Shuffle：上游算子对每条记录随机选择一个下游算子进行发送。</li>
<li>Rebalance：上游算子通过轮询的方式发送数据。</li>
<li>Rescale：当上游和下游算子的实例数为 n 或 m 时，如果 n &lt; m，则每个上游实例向ceil(m/n)或floor(m/n)个下游实例轮询发送数据；如果 n &gt; m，则 floor(n/m) 或 ceil(n/m) 个上游实例向下游实例轮询发送数据。</li>
<li>PartitionCustomer：当上述内置分配方式不满足需求时，用户还可以选择自定义分组方式。</li>
</ul>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtxa8iaumj30k00b9wft.jpg" alt="_13"></p>
<blockquote>
<p>图6. 除keyBy外其它的物理分组方式。</p>
</blockquote>
<p>除分组方式外，Flink DataStream API 中另一个重要概念就是类型系统。图 7 所示，Flink DataStream 对像都是强类型的，每一个 DataStream 对象都需要指定元素的类型，Flink 自己底层的序列化机制正是依赖于这些信息对序列化等进行优化。具体来说，在 Flink 底层，它是使用 TypeInformation 对象对类型进行描述的，TypeInformation 对象定义了一组类型相关的信息供序列化框架使用。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtxaddsw4j30k00b9q4d.jpg" alt="_14"></p>
<blockquote>
<p>图7. Flink DataStream API 中的类型系统</p>
</blockquote>
<p>Flink 内置了一部分常用的基本类型，对于这些类型，Flink 也内置了它们的TypeInformation，用户一般可以直接使用而不需要额外的声明，Flink 自己可以通过类型推断机制识别出相应的类型。但是也会有一些例外的情况，比如，Flink DataStream API 同时支持 Java 和 Scala，Scala API 许多接口是通过隐式的参数来传递类型信息的，所以如果需要通过 Java 调用 Scala 的 API，则需要把这些类型信息通过隐式参数传递过去。另一个例子是 Java 中对泛型存在类型擦除，如果流的类型本身是一个泛型的话，则可能在擦除之后无法推断出类型信息，这时候也需要显式的指定。</p>
<p>在 Flink 中，一般 Java 接口采用 Tuple 类型来组合多个字段，而 Scala 则更经常使用 Row 类型或 Case Class。相对于 Row，Tuple 类型存在两个问题，一个是字段个数不能超过 25 个，此外，所有字段不允许有 null 值。最后，Flink 也支持用户自定义新的类型和 TypeInformation，并通过 Kryo 来实现序列化，但是这种方式可带来一些迁移等方面的问题，所以尽量不要使用自定义的类型。</p>
<h3 id="4-示例"><a href="#4-示例" class="headerlink" title="4.示例"></a>4.示例</h3><p>然后，我们再看一个更复杂的例子。假设我们有一个数据源，它监控系统中订单的情况，当有新订单时，它使用 Tuple2 输出订单中商品的类型和交易额。然后，我们希望实时统计每个类别的交易额，以及实时统计全部类别的交易额。</p>
<p>表4. 实时订单统计示例。</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br></pre></td><td class="code"><pre><span class="line">public class GroupedProcessingTimeWindowSample &#123;</span><br><span class="line">    private static class DataSource extends RichParallelSourceFunction&lt;Tuple2&lt;String, Integer&gt;&gt; &#123;</span><br><span class="line">        private volatile boolean isRunning = true;</span><br><span class="line"></span><br><span class="line">        @Override</span><br><span class="line">        public void run(SourceContext&lt;Tuple2&lt;String, Integer&gt;&gt; ctx) throws Exception &#123;</span><br><span class="line">            Random random = new Random();</span><br><span class="line">            while (isRunning) &#123;</span><br><span class="line">                Thread.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000 * 5);</span><br><span class="line">                String key = &quot;类别&quot; + (char) (&apos;A&apos; + random.nextInt(3));</span><br><span class="line">                int value = random.nextInt(10) + 1;</span><br><span class="line"></span><br><span class="line">                System.out.println(String.format(&quot;Emits\t(%s, %d)&quot;, key, value));</span><br><span class="line">                ctx.collect(new Tuple2&lt;&gt;(key, value));</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">        @Override</span><br><span class="line">        public void cancel() &#123;</span><br><span class="line">            isRunning = false;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">    public static void main(String[] args) throws Exception &#123;</span><br><span class="line">        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();</span><br><span class="line">        env.setParallelism(2);</span><br><span class="line"></span><br><span class="line">        DataStream&lt;Tuple2&lt;String, Integer&gt;&gt; ds = env.addSource(new DataSource());</span><br><span class="line">        KeyedStream&lt;Tuple2&lt;String, Integer&gt;, Tuple&gt; keyedStream = ds.keyBy(0);</span><br><span class="line"></span><br><span class="line">        keyedStream.sum(1).keyBy(new KeySelector&lt;Tuple2&lt;String, Integer&gt;, Object&gt;() &#123;</span><br><span class="line">            @Override</span><br><span class="line">            public Object getKey(Tuple2&lt;String, Integer&gt; stringIntegerTuple2) throws Exception &#123;</span><br><span class="line">                return &quot;&quot;;</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;).fold(new HashMap&lt;String, Integer&gt;(), new FoldFunction&lt;Tuple2&lt;String, Integer&gt;, HashMap&lt;String, Integer&gt;&gt;() &#123;</span><br><span class="line">            @Override</span><br><span class="line">            public HashMap&lt;String, Integer&gt; fold(HashMap&lt;String, Integer&gt; accumulator, Tuple2&lt;String, Integer&gt; value) throws Exception &#123;</span><br><span class="line">                accumulator.put(value.f0, value.f1);</span><br><span class="line">                return accumulator;</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;).addSink(new SinkFunction&lt;HashMap&lt;String, Integer&gt;&gt;() &#123;</span><br><span class="line">            @Override</span><br><span class="line">            public void invoke(HashMap&lt;String, Integer&gt; value, Context context) throws Exception &#123;</span><br><span class="line">                  // 每个类型的商品成交量</span><br><span class="line">                  System.out.println(value);</span><br><span class="line">                  // 商品成交总量                </span><br><span class="line">                  System.out.println(value.values().stream().mapToInt(v -&gt; v).sum());</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;);</span><br><span class="line"></span><br><span class="line">        env.execute();</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>示例的实现如表4所示。首先，在该实现中，我们首先实现了一个模拟的数据源，它继承自 RichParallelSourceFunction，它是可以有多个实例的 SourceFunction 的接口。它有两个方法需要实现，一个是 Run 方法，Flink 在运行时对 Source 会直接调用该方法，该方法需要不断的输出数据，从而形成初始的流。在 Run 方法的实现中，我们随机的产生商品类别和交易量的记录，然后通过 ctx#collect 方法进行发送。另一个方法是 Cancel 方法，当 Flink 需要 Cancel Source Task 的时候会调用该方法，我们使用一个 Volatile 类型的变量来标记和控制执行的状态。</p>
<p>然后，我们在 Main 方法中就可以开始图的构建。我们首先创建了一个 StreamExecutioniEnviroment 对象。创建对象调用的 getExecutionEnvironment 方法会自动判断所处的环境，从而创建合适的对象。例如，如果我们在 IDE 中直接右键运行，则会创建 LocalStreamExecutionEnvironment 对象；如果是在一个实际的环境中，则会创建 RemoteStreamExecutionEnvironment 对象。</p>
<p>基于 Environment 对象，我们首先创建了一个 Source，从而得到初始的&lt;商品类型，成交量&gt;流。然后，为了统计每种类别的成交量，我们使用 KeyBy 按 Tuple 的第 1 个字段（即商品类型）对输入流进行分组，并对每一个 Key 对应的记录的第 2 个字段（即成交量）进行求合。在底层，Sum 算子内部会使用 State 来维护每个Key（即商品类型）对应的成交量之和。当有新记录到达时，Sum 算子内部会更新所维护的成交量之和，并输出一条&lt;商品类型，更新后的成交量&gt;记录。</p>
<p>如果只统计各个类型的成交量，则程序可以到此为止，我们可以直接在 Sum 后添加一个 Sink 算子对不断更新的各类型成交量进行输出。但是，我们还需要统计所有类型的总成交量。为了做到这一点，我们需要将所有记录输出到同一个计算节点的实例上。我们可以通过 KeyBy 并且对所有记录返回同一个 Key，将所有记录分到同一个组中，从而可以全部发送到同一个实例上。</p>
<p>然后，我们使用 Fold 方法来在算子中维护每种类型商品的成交量。注意虽然目前 Fold 方法已经被标记为 Deprecated，但是在 DataStream API 中暂时还没有能替代它的其它操作，所以我们仍然使用 Fold 方法。这一方法接收一个初始值，然后当后续流中每条记录到达的时候，算子会调用所传递的 FoldFunction 对初始值进行更新，并发送更新后的值。我们使用一个 HashMap 来对各个类别的当前成交量进行维护，当有一条新的&lt;商品类别，成交量&gt;到达时，我们就更新该 HashMap。这样在 Sink 中，我们收到的是最新的商品类别和成交量的 HashMap，我们可以依赖这个值来输出各个商品的成交量和总的成交量。</p>
<p>需要指出的是，这个例子主要是用来演示 DataStream API 的用法，实际上还会有更高效的写法，此外，更上层的 Table / SQL 还支持 Retraction 机制，可以更好的处理这种情况。</p>
<p><img src="https://tva1.sinaimg.cn/large/008eGmZEgy1gmtxajahvbj30k00b93zm.jpg" alt="_17"></p>
<blockquote>
<p>图8. API 原理图</p>
</blockquote>
<p>最后，我们对 DataStream API 的原理进行简要的介绍。当我们调用 DataStream#map 算法时，Flink 在底层会创建一个 Transformation 对象，这一对象就代表我们计算逻辑图中的节点。它其中就记录了我们传入的 MapFunction，也就是 UDF（User Define Function）。随着我们调用更多的方法，我们创建了更多的 DataStream 对象，每个对象在内部都有一个 Transformation 对象，这些对象根据计算依赖关系组成一个图结构，就是我们的计算图。后续 Flink 将对这个图结构进行进一步的转换，从而最终生成提交作业所需要的 JobGraph。</p>
<h3 id="5-总结"><a href="#5-总结" class="headerlink" title="5. 总结"></a>5. 总结</h3><p>本文主要介绍了 Flink DataStream API，它是当前 Flink 中比较底层的一套 API。在实际的开发中，基于该 API 需要用户自己处理 State 与 Time 等一些概念，因此需要较大的工作量。后续课程还会介绍更上层的 Table / SQL 层的 API，未来 Table / SQL 可能会成为 Flink 主流的 API，但是对于接口来说，越底层的接口表达能力越强，在一些需要精细操作的情况下，仍然需要依赖于 DataStream API。</p>
<p><span class="exturl" data-url="aHR0cHM6Ly9kZXZlbG9wZXIuYWxpeXVuLmNvbS9hcnRpY2xlLzcxMjcwMz9zcG09YTJjNmguMTI4NzM2MzkuMC4wLjUyMDFmNTUzR3p0aE9hJmFtcDtncm91cENvZGU9c2MmYW1wO2FjY291bnR0cmFjZWlkPTczNmY2NDBhNjQwYjQzYTc5N2Q0NmViMzU0MDM0ZWQwYnZ5cg==" title="https://developer.aliyun.com/article/712703?spm=a2c6h.12873639.0.0.5201f553GzthOa&amp;groupCode=sc&amp;accounttraceid=736f640a640b43a797d46eb354034ed0bvyr">原文地址<i class="fa fa-external-link"></i></span></p>

      
    </div>

    

    
    
    

    

    
      
    
    

    

    <footer class="post-footer">
      
        
          
        
        <div class="post-tags">
          
            <a href="/tags/Flink/" rel="tag"><i class="fa fa-tag"></i> Flink</a>
          
        </div>
      

      
      
        <div class="post-widgets">
        

        

        
          
          <div class="social_share">
            
              <div>
                

<script src="//cdn.jsdelivr.net/npm/ilyabirman-likely@2/release/likely.js"></script>



<link rel="stylesheet" href="//cdn.jsdelivr.net/npm/ilyabirman-likely@2/release/likely.css">


  


<div class="likely likely-light">
	
 	 	<div class="twitter">Tweet</div>
	
 	 	<div class="facebook">Share</div>
	
 	 	<div class="linkedin">Link</div>
	
 	 	<div class="gplus">Plus</div>
	
 	 	<div class="vkontakte">Share</div>
	
 	 	<div class="odnoklassniki">Class</div>
	
 	 	<div class="telegram">Send</div>
	
 	 	<div class="whatsapp">Send</div>
	
 	 	<div class="pinterest">Pin</div>
	
</div>

              </div>
            
            
            
              <div>
                
  <div class="bdsharebuttonbox">
    <a href="#" class="bds_tsina" data-cmd="tsina" title="分享到新浪微博"></a>
    <a href="#" class="bds_douban" data-cmd="douban" title="分享到豆瓣网"></a>
    <a href="#" class="bds_sqq" data-cmd="sqq" title="分享到QQ好友"></a>
    <a href="#" class="bds_qzone" data-cmd="qzone" title="分享到QQ空间"></a>
    <a href="#" class="bds_weixin" data-cmd="weixin" title="分享到微信"></a>
    <a href="#" class="bds_tieba" data-cmd="tieba" title="分享到百度贴吧"></a>
    <a href="#" class="bds_twi" data-cmd="twi" title="分享到Twitter"></a>
    <a href="#" class="bds_fbook" data-cmd="fbook" title="分享到Facebook"></a>
    <a href="#" class="bds_more" data-cmd="more"></a>
    <a class="bds_count" data-cmd="count"></a>
  </div>
  <script>
    window._bd_share_config = {
      "common": {
        "bdText": "",
        "bdMini": "2",
        "bdMiniList": false,
        "bdPic": ""
      },
      "share": {
        "bdSize": "16",
        "bdStyle": "0"
      },
      "image": {
        "viewList": ["tsina", "douban", "sqq", "qzone", "weixin", "twi", "fbook"],
        "viewText": "分享到：",
        "viewSize": "16"
      }
    }
  </script>

<script>
  with(document)0[(getElementsByTagName('head')[0]||body).appendChild(createElement('script')).src='//bdimg.share.baidu.com/static/api/js/share.js?cdnversion='+~(-new Date()/36e5)];
</script>

              </div>
            
          </div>
        
        </div>
      
      

      
        <div class="post-nav">
          <div class="post-nav-next post-nav-item">
            
              <a href="/Apache Flink 零基础入门（二）：开发环境搭建和应用的配置、部署及运行.html" rel="next" title="Apache Flink 零基础入门（二）：开发环境搭建和应用的配置、部署及运行">
                <i class="fa fa-chevron-left"></i> Apache Flink 零基础入门（二）：开发环境搭建和应用的配置、部署及运行
              </a>
            
          </div>

          <span class="post-nav-divider"></span>

          <div class="post-nav-prev post-nav-item">
            
              <a href="/零基础入门：从0到1学会apache Flink.html" rel="prev" title="零基础入门：从0到1学会apache Flink">
                零基础入门：从0到1学会apache Flink <i class="fa fa-chevron-right"></i>
              </a>
            
          </div>
        </div>
      

      
      
    </footer>
  </div>
  
  
  
  </article>


  </div>


          </div>
          

  
    <div class="comments" id="comments">
      <div id="lv-container" data-id="city" data-uid="MTAyMC8yOTk3My82NTM4"></div>
    </div>

  



        </div>
        
          
  
  <div class="sidebar-toggle">
    <div class="sidebar-toggle-line-wrap">
      <span class="sidebar-toggle-line sidebar-toggle-line-first"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-middle"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-last"></span>
    </div>
  </div>

  <aside id="sidebar" class="sidebar">
    <div class="sidebar-inner">

      

      
        <ul class="sidebar-nav motion-element">
          <li class="sidebar-nav-toc sidebar-nav-active" data-target="post-toc-wrap">
            文章目录
          </li>
          <li class="sidebar-nav-overview" data-target="site-overview-wrap">
            站点概览
          </li>
        </ul>
      

      <div class="site-overview-wrap sidebar-panel">
        <div class="site-overview">
          <div class="site-author motion-element" itemprop="author" itemscope="" itemtype="http://schema.org/Person">
            
              <img class="site-author-image" itemprop="image" src="https://hexoblog-1254111960.cos.ap-guangzhou.myqcloud.com/HexoBlog-tou.jpg" alt="Daniel X">
            
              <p class="site-author-name" itemprop="name">Daniel X</p>
              <div class="site-description motion-element" itemprop="description">專注于大数据技術，分享干货</div>
          </div>

          
            <nav class="site-state motion-element">
              
                <div class="site-state-item site-state-posts">
                
                  <a href="/archives/">
                
                    <span class="site-state-item-count">125</span>
                    <span class="site-state-item-name">日志</span>
                  </a>
                </div>
              

              
                
                
                <div class="site-state-item site-state-categories">
                  
                    
                      <a href="/categories">
                    
                  
                    
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                    <span class="site-state-item-count">15</span>
                    <span class="site-state-item-name">分类</span>
                  </a>
                </div>
              

              
                
                
                <div class="site-state-item site-state-tags">
                  
                    
                      <a href="/tags">
                    
                  
                    
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                      
                    
                    <span class="site-state-item-count">63</span>
                    <span class="site-state-item-name">标签</span>
                  </a>
                </div>
              
            </nav>
          

          

          

          
            <div class="links-of-author motion-element">
              
                <span class="links-of-author-item">
                  
                  
                    
                  
                  
                    
                  
                  <span class="exturl" data-url="aHR0cHM6Ly9naXRodWIuY29tL2R1ZGVmdQ==" title="GitHub &rarr; https://github.com/dudefu"><i class="fa fa-fw fa-github"></i>GitHub</span>
                </span>
              
                <span class="links-of-author-item">
                  
                  
                    
                  
                  
                    
                  
                  <span class="exturl" data-url="bWFpbHRvOmR1ZGVmdUBmb3htYWlsLmNvbT9zdWJqZWN0PUhlbGxvJTIwYWdhaW4=" title="E-mail &rarr; mailto:dudefu@foxmail.com?subject=Hello%20again"><i class="fa fa-fw fa-envelope"></i>E-mail</span>
                </span>
              
                <span class="links-of-author-item">
                  
                  
                    
                  
                  
                    
                  
                  <span class="exturl" data-url="aHR0cHM6Ly93ZWliby5jb20vZHVkZWZ1" title="Weibo &rarr; https://weibo.com/dudefu"><i class="fa fa-fw fa-weibo"></i>Weibo</span>
                </span>
              
                <span class="links-of-author-item">
                  
                  
                    
                  
                  
                    
                  
                  <span class="exturl" data-url="aHR0cHM6Ly93cGEucXEuY29tL21zZ3JkP3Y9MyZ1aW49MTU3NzU3MTk1OSZzaXRlPWR1ZGVmdS5pbmZvJm1lbnU9eWVz" title="QQ &rarr; https://wpa.qq.com/msgrd?v=3&uin=1577571959&site=dudefu.info&menu=yes"><i class="fa fa-fw fa-qq"></i>QQ</span>
                </span>
              
            </div>
          

          

          
          

          
            
          
          

        </div>
      </div>

      
      <!--noindex-->
        <div class="post-toc-wrap motion-element sidebar-panel sidebar-panel-active">
          <div class="post-toc">

            
            
            
            

            
              <div class="post-toc-content"><ol class="nav"><li class="nav-item nav-level-3"><a class="nav-link" href="#1-流处理基本概念"><span class="nav-number">1.</span> <span class="nav-text">1. 流处理基本概念</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#2-Flink-DataStream-API-概览"><span class="nav-number">2.</span> <span class="nav-text">2. Flink DataStream API 概览</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#3-其它问题"><span class="nav-number">3.</span> <span class="nav-text">3. 其它问题</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#4-示例"><span class="nav-number">4.</span> <span class="nav-text">4.示例</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#5-总结"><span class="nav-number">5.</span> <span class="nav-text">5. 总结</span></a></li></ol></div>
            

          </div>
        </div>
      <!--/noindex-->
      

      

    </div>
  </aside>
  


        
      </div>
    </main>

    <footer id="footer" class="footer">
      <div class="footer-inner">
        <div class="copyright">  <span class="exturl" data-url="aHR0cDovL3d3dy5iZWlhbi5taWl0Lmdvdi5jbg==">粤ICP备18110871号 </span>&copy; 2017 – <span itemprop="copyrightYear">2021</span>
  <span class="with-love" id="animate">
    <i class="fa fa-spinner"></i>
  </span>
  <span class="author" itemprop="copyrightHolder">dudefu</span>

  

  
</div>

<!--

  <div class="powered-by">由 <span class="exturl theme-link" data-url="aHR0cHM6Ly9oZXhvLmlv">Hexo</span> 强力驱动 v3.8.0</div>



  <span class="post-meta-divider">|</span>



  <div class="theme-info">主题 – <span class="exturl theme-link" data-url="aHR0cHM6Ly90aGVtZS1uZXh0Lm9yZw==">NexT.Pisces</span> v7.1.2</div>

-->



        








        
      </div>
    </footer>

    
      <div class="back-to-top">
        <i class="fa fa-arrow-up"></i>
        
          <span id="scrollpercent"><span>0</span>%</span>
        
      </div>
    

    

    

    
  </div>

  

<script>
  if (Object.prototype.toString.call(window.Promise) !== '[object Function]') {
    window.Promise = null;
  }
</script>














  
    
    
      
    
  
  <script color="26,26,26" opacity="0.5" zindex="-1" count="99" src="//cdn.jsdelivr.net/gh/theme-next/theme-next-canvas-nest@1/canvas-nest.min.js"></script>









  
  
  <script id="ribbon" size="300" alpha="0.6" zindex="-1" src="/lib/canvas-ribbon/canvas-ribbon.js"></script>



  



  
  <script src="/lib/jquery/index.js?v=3.4.1"></script>

  
  <script src="/lib/velocity/velocity.min.js?v=1.2.1"></script>

  
  <script src="/lib/velocity/velocity.ui.min.js?v=1.2.1"></script>

  
  <script src="/lib/reading_progress/reading_progress.js"></script>


  


  <script src="/js/utils.js?v=7.1.2"></script>

  <script src="/js/motion.js?v=7.1.2"></script>



  
  


  <script src="/js/affix.js?v=7.1.2"></script>

  <script src="/js/schemes/pisces.js?v=7.1.2"></script>



  
  <script src="/js/scrollspy.js?v=7.1.2"></script>
<script src="/js/post-details.js?v=7.1.2"></script>



  


  <script src="/js/next-boot.js?v=7.1.2"></script>


  
  <script src="/js/js.cookie.js?v=7.1.2"></script>
  <script src="/js/scroll-cookie.js?v=7.1.2"></script>


  
  <script src="/js/exturl.js?v=7.1.2"></script>


  

  
  

<script src="//cdn1.lncld.net/static/js/3.11.1/av-min.js"></script>



<script src="//unpkg.com/valine/dist/Valine.min.js"></script>

<script>
  var GUEST = ['nick', 'mail', 'link'];
  var guest = 'nick,mail,link';
  guest = guest.split(',').filter(function(item) {
    return GUEST.indexOf(item) > -1;
  });
  new Valine({
    el: '#comments',
    verify: true,
    notify: true,
    appId: '1N5rpk874DGudJw2wCL9J011-gzGzoHsz',
    appKey: '9Y83e6suJgx567wtxhKy45IN',
    placeholder: 'Just go go',
    avatar: 'mm',
    meta: guest,
    pageSize: '10' || 10,
    visitor: true,
    lang: 'zk-cn' || 'zh-cn'
  });
</script>




  
    <script>
  window.livereOptions = {
    refer: 'Apache Flink 零基础入门（三）：DataStream API 编程.html'
  };
  (function(d, s) {
    var j, e = d.getElementsByTagName(s)[0];
    if (typeof LivereTower === 'function') { return; }
    j = d.createElement(s);
    j.src = 'https://cdn-city.livere.com/js/embed.dist.js';
    j.async = true;
    e.parentNode.insertBefore(j, e);
  })(document, 'script');
</script>

  


  
  <script>
    // Popup Window;
    var isfetched = false;
    var isXml = true;
    // Search DB path;
    var search_path = "search.xml";
    if (search_path.length === 0) {
      search_path = "search.xml";
    } else if (/json$/i.test(search_path)) {
      isXml = false;
    }
    var path = "/" + search_path;
    // monitor main search box;

    var onPopupClose = function (e) {
      $('.popup').hide();
      $('#local-search-input').val('');
      $('.search-result-list').remove();
      $('#no-result').remove();
      $(".local-search-pop-overlay").remove();
      $('body').css('overflow', '');
    }

    function proceedsearch() {
      $("body")
        .append('<div class="search-popup-overlay local-search-pop-overlay"></div>')
        .css('overflow', 'hidden');
      $('.search-popup-overlay').click(onPopupClose);
      $('.popup').toggle();
      var $localSearchInput = $('#local-search-input');
      $localSearchInput.attr("autocapitalize", "none");
      $localSearchInput.attr("autocorrect", "off");
      $localSearchInput.focus();
    }

    // search function;
    var searchFunc = function(path, search_id, content_id) {
      'use strict';

      // start loading animation
      $("body")
        .append('<div class="search-popup-overlay local-search-pop-overlay">' +
          '<div id="search-loading-icon">' +
          '<i class="fa fa-spinner fa-pulse fa-5x fa-fw"></i>' +
          '</div>' +
          '</div>')
        .css('overflow', 'hidden');
      $("#search-loading-icon").css('margin', '20% auto 0 auto').css('text-align', 'center');

      

      $.ajax({
        url: path,
        dataType: isXml ? "xml" : "json",
        async: true,
        success: function(res) {
          // get the contents from search data
          isfetched = true;
          $('.popup').detach().appendTo('.header-inner');
          var datas = isXml ? $("entry", res).map(function() {
            return {
              title: $("title", this).text(),
              content: $("content",this).text(),
              url: $("url" , this).text()
            };
          }).get() : res;
          var input = document.getElementById(search_id);
          var resultContent = document.getElementById(content_id);
          var inputEventFunction = function() {
            var searchText = input.value.trim().toLowerCase();
            var keywords = searchText.split(/[\s\-]+/);
            if (keywords.length > 1) {
              keywords.push(searchText);
            }
            var resultItems = [];
            if (searchText.length > 0) {
              // perform local searching
              datas.forEach(function(data) {
                var isMatch = false;
                var hitCount = 0;
                var searchTextCount = 0;
                var title = data.title.trim();
                var titleInLowerCase = title.toLowerCase();
                var content = data.content.trim().replace(/<[^>]+>/g,"");
                
                var contentInLowerCase = content.toLowerCase();
                var articleUrl = decodeURIComponent(data.url).replace(/\/{2,}/g, '/');
                var indexOfTitle = [];
                var indexOfContent = [];
                // only match articles with not empty titles
                if(title != '') {
                  keywords.forEach(function(keyword) {
                    function getIndexByWord(word, text, caseSensitive) {
                      var wordLen = word.length;
                      if (wordLen === 0) {
                        return [];
                      }
                      var startPosition = 0, position = [], index = [];
                      if (!caseSensitive) {
                        text = text.toLowerCase();
                        word = word.toLowerCase();
                      }
                      while ((position = text.indexOf(word, startPosition)) > -1) {
                        index.push({position: position, word: word});
                        startPosition = position + wordLen;
                      }
                      return index;
                    }

                    indexOfTitle = indexOfTitle.concat(getIndexByWord(keyword, titleInLowerCase, false));
                    indexOfContent = indexOfContent.concat(getIndexByWord(keyword, contentInLowerCase, false));
                  });
                  if (indexOfTitle.length > 0 || indexOfContent.length > 0) {
                    isMatch = true;
                    hitCount = indexOfTitle.length + indexOfContent.length;
                  }
                }

                // show search results

                if (isMatch) {
                  // sort index by position of keyword

                  [indexOfTitle, indexOfContent].forEach(function (index) {
                    index.sort(function (itemLeft, itemRight) {
                      if (itemRight.position !== itemLeft.position) {
                        return itemRight.position - itemLeft.position;
                      } else {
                        return itemLeft.word.length - itemRight.word.length;
                      }
                    });
                  });

                  // merge hits into slices

                  function mergeIntoSlice(text, start, end, index) {
                    var item = index[index.length - 1];
                    var position = item.position;
                    var word = item.word;
                    var hits = [];
                    var searchTextCountInSlice = 0;
                    while (position + word.length <= end && index.length != 0) {
                      if (word === searchText) {
                        searchTextCountInSlice++;
                      }
                      hits.push({position: position, length: word.length});
                      var wordEnd = position + word.length;

                      // move to next position of hit

                      index.pop();
                      while (index.length != 0) {
                        item = index[index.length - 1];
                        position = item.position;
                        word = item.word;
                        if (wordEnd > position) {
                          index.pop();
                        } else {
                          break;
                        }
                      }
                    }
                    searchTextCount += searchTextCountInSlice;
                    return {
                      hits: hits,
                      start: start,
                      end: end,
                      searchTextCount: searchTextCountInSlice
                    };
                  }

                  var slicesOfTitle = [];
                  if (indexOfTitle.length != 0) {
                    slicesOfTitle.push(mergeIntoSlice(title, 0, title.length, indexOfTitle));
                  }

                  var slicesOfContent = [];
                  while (indexOfContent.length != 0) {
                    var item = indexOfContent[indexOfContent.length - 1];
                    var position = item.position;
                    var word = item.word;
                    // cut out 100 characters
                    var start = position - 20;
                    var end = position + 80;
                    if(start < 0){
                      start = 0;
                    }
                    if (end < position + word.length) {
                      end = position + word.length;
                    }
                    if(end > content.length){
                      end = content.length;
                    }
                    slicesOfContent.push(mergeIntoSlice(content, start, end, indexOfContent));
                  }

                  // sort slices in content by search text's count and hits' count

                  slicesOfContent.sort(function (sliceLeft, sliceRight) {
                    if (sliceLeft.searchTextCount !== sliceRight.searchTextCount) {
                      return sliceRight.searchTextCount - sliceLeft.searchTextCount;
                    } else if (sliceLeft.hits.length !== sliceRight.hits.length) {
                      return sliceRight.hits.length - sliceLeft.hits.length;
                    } else {
                      return sliceLeft.start - sliceRight.start;
                    }
                  });

                  // select top N slices in content

                  var upperBound = parseInt('1');
                  if (upperBound >= 0) {
                    slicesOfContent = slicesOfContent.slice(0, upperBound);
                  }

                  // highlight title and content

                  function highlightKeyword(text, slice) {
                    var result = '';
                    var prevEnd = slice.start;
                    slice.hits.forEach(function (hit) {
                      result += text.substring(prevEnd, hit.position);
                      var end = hit.position + hit.length;
                      result += '<b class="search-keyword">' + text.substring(hit.position, end) + '</b>';
                      prevEnd = end;
                    });
                    result += text.substring(prevEnd, slice.end);
                    return result;
                  }

                  var resultItem = '';

                  if (slicesOfTitle.length != 0) {
                    resultItem += "<li><a href='" + articleUrl + "' class='search-result-title'>" + highlightKeyword(title, slicesOfTitle[0]) + "</a>";
                  } else {
                    resultItem += "<li><a href='" + articleUrl + "' class='search-result-title'>" + title + "</a>";
                  }

                  slicesOfContent.forEach(function (slice) {
                    resultItem += "<a href='" + articleUrl + "'>" +
                      "<p class=\"search-result\">" + highlightKeyword(content, slice) +
                      "...</p>" + "</a>";
                  });

                  resultItem += "</li>";
                  resultItems.push({
                    item: resultItem,
                    searchTextCount: searchTextCount,
                    hitCount: hitCount,
                    id: resultItems.length
                  });
                }
              })
            };
            if (keywords.length === 1 && keywords[0] === "") {
              resultContent.innerHTML = '<div id="no-result"><i class="fa fa-search fa-5x"></i></div>'
            } else if (resultItems.length === 0) {
              resultContent.innerHTML = '<div id="no-result"><i class="fa fa-frown-o fa-5x"></i></div>'
            } else {
              resultItems.sort(function (resultLeft, resultRight) {
                if (resultLeft.searchTextCount !== resultRight.searchTextCount) {
                  return resultRight.searchTextCount - resultLeft.searchTextCount;
                } else if (resultLeft.hitCount !== resultRight.hitCount) {
                  return resultRight.hitCount - resultLeft.hitCount;
                } else {
                  return resultRight.id - resultLeft.id;
                }
              });
              var searchResultList = '<ul class=\"search-result-list\">';
              resultItems.forEach(function (result) {
                searchResultList += result.item;
              })
              searchResultList += "</ul>";
              resultContent.innerHTML = searchResultList;
            }
          }

          if ('auto' === 'auto') {
            input.addEventListener('input', inputEventFunction);
          } else {
            $('.search-icon').click(inputEventFunction);
            input.addEventListener('keypress', function (event) {
              if (event.keyCode === 13) {
                inputEventFunction();
              }
            });
          }

          // remove loading animation
          $(".local-search-pop-overlay").remove();
          $('body').css('overflow', '');

          proceedsearch();
        }
      });
    }

    // handle and trigger popup window;
    $('.popup-trigger').click(function(e) {
      e.stopPropagation();
      if (isfetched === false) {
        searchFunc(path, 'local-search-input', 'local-search-result');
      } else {
        proceedsearch();
      };
    });

    $('.popup-btn-close').click(onPopupClose);
    $('.popup').click(function(e){
      e.stopPropagation();
    });
    $(document).on('keyup', function (event) {
      var shouldDismissSearchPopup = event.which === 27 &&
        $('.search-popup').is(':visible');
      if (shouldDismissSearchPopup) {
        onPopupClose();
      }
    });
  </script>





  

  
  <script src="https://www.gstatic.com/firebasejs/4.6.0/firebase.js"></script>
  <script src="https://www.gstatic.com/firebasejs/4.6.0/firebase-firestore.js"></script>
  
    <script src="https://cdnjs.cloudflare.com/ajax/libs/bluebird/3.5.1/bluebird.core.min.js"></script>
  
  <script>
    (function () {

      firebase.initializeApp({
        apiKey: '',
        projectId: ''
      })

      function getCount(doc, increaseCount) {
        //increaseCount will be false when not in article page

        return doc.get().then(function (d) {
          var count
          if (!d.exists) { //has no data, initialize count
            if (increaseCount) {
              doc.set({
                count: 1
              })
              count = 1
            }
            else {
              count = 0
            }
          }
          else { //has data
            count = d.data().count
            if (increaseCount) {
              if (!(window.localStorage && window.localStorage.getItem(title))) { //if first view this article
                doc.set({ //increase count
                  count: count + 1
                })
                count++
              }
            }
          }
          if (window.localStorage && increaseCount) { //mark as visited
            localStorage.setItem(title, true)
          }

          return count
        })
      }

      function appendCountTo(el) {
        return function (count) {
          $(el).append(
            $('<span>').addClass('post-visitors-count').append(
              $('<span>').addClass('post-meta-divider').text('|')
            ).append(
              $('<span>').addClass('post-meta-item-icon').append(
                $('<i>').addClass('fa fa-users')
              )
              ).append($('<span>').text('阅读次数 ' + count))
          )
        }
      }

      var db = firebase.firestore()
      var articles = db.collection('articles')

      //https://hexo.io/docs/variables.html
      var isPost = 'Apache Flink 零基础入门（三）：DataStream API 编程'.length > 0
      var isArchive = '' === 'true'
      var isCategory = ''.length > 0
      var isTag = ''.length > 0

      if (isPost) { //is article page
        var title = 'Apache Flink 零基础入门（三）：DataStream API 编程'
        var doc = articles.doc(title)

        getCount(doc, true).then(appendCountTo($('.post-meta')))
      }
      else if (!isArchive && !isCategory && !isTag) { //is index page
        var titles = [] //array to titles

        var postsstr = '' //if you have a better way to get titles of posts, please change it
        eval(postsstr)

        var promises = titles.map(function (title) {
          return articles.doc(title)
        }).map(function (doc) {
          return getCount(doc)
        })
        Promise.all(promises).then(function (counts) {
          var metas = $('.post-meta')
          counts.forEach(function (val, idx) {
            appendCountTo(metas[idx])(val)
          })
        })
      }
    })()
  </script>


  
  

  
  

  


  
<script>
if ($('body').find('div.pdf').length) {
  $.ajax({
    type: 'GET',
    url: '//cdn.jsdelivr.net/npm/pdfobject@2/pdfobject.min.js',
    dataType: 'script',
    cache: true,
    success: function() {
      $('body').find('div.pdf').each(function(i, o) {
        PDFObject.embed($(o).attr('target'), $(o), {
          pdfOpenParams: {
            navpanes: 0,
            toolbar: 0,
            statusbar: 0,
            pagemode: 'thumbs',
            view: 'FitH'
          },
          PDFJS_URL: '/lib/pdf/web/viewer.html',
          height: $(o).attr('height') || '500px'
        });
      });
    },
  });
}
</script>


  
<script>
if ($('body').find('pre.mermaid').length) {
  $.ajax({
    type: 'GET',
    url: '//cdn.jsdelivr.net/npm/mermaid@8/dist/mermaid.min.js',
    dataType: 'script',
    cache: true,
    success: function() {
      mermaid.initialize({
        theme: 'dark',
        logLevel: 3,
        flowchart: { curve: 'linear' },
        gantt: { axisFormat: '%m/%d/%Y' },
        sequence: { actorMargin: 50 }
      });
    }
  });
}
</script>


  
  <script>
    (function(){
      var bp = document.createElement('script');
      var curProtocol = window.location.protocol.split(':')[0];
      bp.src = (curProtocol === 'https') ? 'https://zz.bdstatic.com/linksubmit/push.js' : 'http://push.zhanzhang.baidu.com/push.js';
      var s = document.getElementsByTagName("script")[0];
      s.parentNode.insertBefore(bp, s);
    })();
  </script>


  

  

  

  

  
  
  
  <script src="/lib/bookmark/bookmark.min.js?v=1.0"></script>
  <script>
  
    bookmark.scrollToMark('auto', "#更多");
  
  </script>


  
<script>
  $('.highlight').not('.gist .highlight').each(function(i, e) {
    var $wrap = $('<div>').addClass('highlight-wrap');
    $(e).after($wrap);
    $wrap.append($('<button>').addClass('copy-btn').append('复制').on('click', function(e) {
      var code = $(this).parent().find('.code').find('.line').map(function(i, e) {
        return $(e).text();
      }).toArray().join('\n');
      var ta = document.createElement('textarea');
      var yPosition = window.pageYOffset || document.documentElement.scrollTop;
      ta.style.top = yPosition + 'px'; // Prevent page scroll
      ta.style.position = 'absolute';
      ta.style.opacity = '0';
      ta.readOnly = true;
      ta.value = code;
      document.body.appendChild(ta);
      const selection = document.getSelection();
      const selected = selection.rangeCount > 0 ? selection.getRangeAt(0) : false;
      ta.select();
      ta.setSelectionRange(0, code.length);
      ta.readOnly = false;
      var result = document.execCommand('copy');
      
        if (result) $(this).text('复制成功');
        else $(this).text('复制失败');
      
      ta.blur(); // For iOS
      $(this).blur();
      if (selected) {
        selection.removeAllRanges();
        selection.addRange(selected);
      }
    })).on('mouseleave', function(e) {
      var $b = $(this).find('.copy-btn');
      setTimeout(function() {
        $b.text('复制');
      }, 300);
    }).append(e);
  })
</script>


  

  

</body>
</html>
